-
Notifications
You must be signed in to change notification settings - Fork 856
Refactor MMLSpark for Structured Streaming #134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
6e55de4 to
5a0dfa8
Compare
f4e93cb to
20d048e
Compare
fc00299 to
b87a9e6
Compare
| @@ -1,29 +0,0 @@ | |||
| // Copyright (C) Microsoft Corporation. All rights reserved. | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please keep these in their original places since we'll be moving to Spark Images soon anyway.
| .queryName("images") | ||
| .start() | ||
|
|
||
| Thread.sleep(3000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any way to check for 6 images being found more directly than wait for 3 seconds? What if it took 4 seconds sometimes or 1 second (in which case you'll be blocking for longer than needed).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| // Copyright (C) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. See LICENSE in project root for information. | ||
|
|
||
| package org.apache.spark.sql.execution.datasources.binary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider putting this in org.apache.spark.image package
|
|
||
| inputStream = fs.open(file) | ||
| rng.setSeed(filename.hashCode.toLong) | ||
| if (inspectZip) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no nested ifs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| class HadoopFileReader(file: PartitionedFile, conf: Configuration, subsample: Double, inspectZip: Boolean) | ||
| extends Iterator[BytesWritable] with Closeable { | ||
|
|
||
| Logger.getRootLogger.warn("reading " + file.filePath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
take out
| filteredPaths.map(_.getPath) ++ filteredDirs.flatMap(p => recursePath(fileSystem, p, pathFilter)) | ||
| } | ||
|
|
||
| def streamUnstructured(ssc: StreamingContext, directory: String): InputDStream[(String, BytesWritable)] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider removing support for non-structured stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
drdarshan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there.. please also add a sample notebook since this is a pretty epic change.
234dc65 to
3883acf
Compare
|
MMLSpark 0.8.dev1+7.ge8535c7This is a new version build! This is a build for github PR #134, changes:
|
| /** | ||
| * Thin wrapper class analogous to others in the spark ecosystem | ||
| */ | ||
| class HadoopFileReader(file: PartitionedFile, conf: Configuration, subsample: Double, inspectZip: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| def isBinaryFile(df:DataFrame, col: String): Boolean = | ||
| df.schema(col).dataType == schema | ||
|
|
||
| def recursePath(fileSystem: FileSystem, path: Path, pathFilter:FileStatus => Boolean): Array[Path] ={ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check for symlinks
| * @param recursive Recursive search flag | ||
| * @return DataFrame with a single column of "binaryFiles", see "columnSchema" for details | ||
| */ | ||
| def read(path: String, recursive: Boolean, spark: SparkSession, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sure this works with python monkeypatch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
| case Some(row) => | ||
| val imGenRow = new GenericInternalRow(1) | ||
| val genRow = new GenericInternalRow(ImageReader.columnSchema.fields.length) | ||
| genRow.update(0, UTF8String.fromString(row.getString(0))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment to direct readers to image schema
| * @return returns None if decompression fails | ||
| */ | ||
| private[spark] def decode(filename: String, bytes: Array[Byte]): Option[Row] = { | ||
| def decode(filename: String, bytes: Array[Byte]): Option[Row] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put in right namespace and make private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impossible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK!
| filteredPaths.map(_.getPath) ++ filteredDirs.flatMap(p => recursePath(fileSystem, p, pathFilter)) | ||
| } | ||
|
|
||
| def streamUnstructured(ssc: StreamingContext, directory: String): InputDStream[(String, BytesWritable)] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
|
||
| inputStream = fs.open(file) | ||
| rng.setSeed(filename.hashCode.toLong) | ||
| if (inspectZip) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| .queryName("images") | ||
| .start() | ||
|
|
||
| Thread.sleep(3000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| false | ||
| } | ||
| } else { | ||
| rng.setSeed(filename.hashCode.toLong) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This rng is used internally only, so its not like we are overriding a user supplied rng. I used this method, because distributed, reproducible random splits are a very hard problem. This is because we don't control the list of paths that we load in, spark provides this for us. Also the random split needs to be robust to partitioning strategy, which makes a single RNG impossible, as it would be dependent on the ordering. Here i chose to make the RNG dependent on the filename, which is why it uses the filename as the seed. This will allow for reproducibility, provided the filenames are the same. Also the randomness stays the same because the seeds will all be different provided there are no hash collisions, and in the case of iterating through zip files there random seed is not set every time. I realize now that the above setting of the seeed seems redudant, but harmless so I will remove it and rely on the seed setting in the init
Yes its definitely a hack, but its the least egregious hack i could think of and is fairly performant. I think the real way to do this might be to use the filters provided by the catalyst optimizer, but that involves implementing an entire DSL of filters, and would be more than happy to investigate in a further PR.
ad159a0 to
ccfbee2
Compare
|
MMLSpark 0.8.dev2+3.gccfbee2This is a new version build! This is a build for github PR #134, changes:
|
f28f3db to
544b32f
Compare
|
MMLSpark 0.8.dev2+3.g544b32fThis is a new version build! This is a build for github PR #134, changes:
|
drdarshan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for working through all the comments!
3da51d6 to
c620d9f
Compare
a47b0b3 to
65a3635
Compare
65a3635 to
e52aecf
Compare
|
MMLSpark 0.8.dev6+4.g65a3635This is a build for github PR #134, changes:
|
Uh oh!
There was an error while loading. Please reload this page.